/*
Copyright 2016 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

import (
	"fmt"
	"time"

	federation_release_1_3 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
	v1 "k8s.io/kubernetes/pkg/api/v1"
	cache "k8s.io/kubernetes/pkg/client/cache"
	"k8s.io/kubernetes/pkg/controller"

	"github.com/golang/glog"
)

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceController) clusterEndpointWorker() {
	fedClient := sc.federationClient
	for clusterName, cache := range sc.clusterCache.clientMap {
		go func(cache *clusterCache, clusterName string) {
			for {
				func() {
					key, quit := cache.endpointQueue.Get()
					// update endpoint cache
					if quit {
						return
					}
					defer cache.endpointQueue.Done(key)
					err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
					if err != nil {
						glog.V(2).Infof("Failed to sync endpoint: %+v", err)
					}
				}()
			}
		}(cache, clusterName)
	}
}

// Whenever there is change on endpoint, the federation service should be updated
// key is the namespaced name of endpoint
func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface, serviceController *ServiceController) error {
	cachedService, ok := serviceCache.get(key)
	if !ok {
		// here we filtered all non-federation services
		return nil
	}
	endpointInterface, exists, err := clusterCache.endpointStore.GetByKey(key)
	if err != nil {
		glog.Infof("Did not successfully get %v from store: %v, will retry later", key, err)
		clusterCache.endpointQueue.Add(key)
		return err
	}
	if exists {
		endpoint, ok := endpointInterface.(*v1.Endpoints)
		if ok {
			glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
			err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
		} else {
			_, ok := endpointInterface.(cache.DeletedFinalStateUnknown)
			if !ok {
				return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", endpointInterface)
			}
			glog.Infof("Found tombstone for %v", key)
			err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
		}
	} else {
		// service absence in store means watcher caught the deletion, ensure LB info is cleaned
		glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName)
		err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
	}
	if err != nil {
		glog.Errorf("Failed to sync service: %+v, put back to service queue", err)
		clusterCache.endpointQueue.Add(key)
	}
	cachedService.resetDNSUpdateDelay()
	return nil
}

func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
	glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
	var err error
	cachedService.rwlock.Lock()
	defer cachedService.rwlock.Unlock()
	_, ok := cachedService.endpointMap[clusterName]
	// TODO remove ok checking? if service controller is restarted, then endpointMap for the cluster does not exist
	// need to query dns info from dnsprovider and make sure of if deletion is needed
	if ok {
		// endpoints lost, clean dns record
		glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
		delete(cachedService.endpointMap, clusterName)
		for i := 0; i < clientRetryCount; i++ {
			if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil {
				return nil
			}
			glog.V(4).Infof("Error ensuring DNS Records: %v", err)
			time.Sleep(cachedService.nextDNSUpdateDelay())
		}
	}
	return err
}

// Update dns info when endpoint update event received
// We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error {
	glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
	cachedService.rwlock.Lock()
	var reachable bool
	defer cachedService.rwlock.Unlock()
	_, ok := cachedService.endpointMap[clusterName]
	if !ok {
		for _, subset := range endpoint.Subsets {
			if len(subset.Addresses) > 0 {
				reachable = true
				break
			}
		}
		if reachable {
			// first time get endpoints, update dns record
			glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
			cachedService.endpointMap[clusterName] = 1
			if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
				glog.V(4).Infof("Error ensuring DNS Records: %v", err)
				for i := 0; i < clientRetryCount; i++ {
					time.Sleep(cachedService.nextDNSUpdateDelay())
					err := serviceController.ensureDnsRecords(clusterName, cachedService)
					if err == nil {
						return nil
					}
				}
				return err
			}
		}
	} else {
		for _, subset := range endpoint.Subsets {
			if len(subset.Addresses) > 0 {
				reachable = true
				break
			}
		}
		if !reachable {
			// first time get endpoints, update dns record
			glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
			delete(cachedService.endpointMap, clusterName)
			if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
				glog.V(4).Infof("Error ensuring DNS Records: %v", err)
				for i := 0; i < clientRetryCount; i++ {
					time.Sleep(cachedService.nextDNSUpdateDelay())
					err := serviceController.ensureDnsRecords(clusterName, cachedService)
					if err == nil {
						return nil
					}
				}
				return err
			}
		}
	}
	return nil
}

// obj could be an *api.Endpoints, or a DeletionFinalStateUnknown marker item.
func (cc *clusterClientCache) enqueueEndpoint(obj interface{}, clusterName string) {
	key, err := controller.KeyFunc(obj)
	if err != nil {
		glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
		return
	}
	_, ok := cc.clientMap[clusterName]
	if ok {
		cc.clientMap[clusterName].endpointQueue.Add(key)
	}
}
